package com.sugo.seckill.mq;


import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ser.Serializers;
import com.sugo.seckill.error.BaseException;
import com.sugo.seckill.mapper.order.SeckillGoodsMapper;
import com.sugo.seckill.order.service.SeckillOrderService;
import com.sugo.seckill.pojo.TbSeckillGoods;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.redisson.misc.Hash;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

@Component
public class MqProducer {

    private static final String producerGroup = "seckillGroup";
    private static final String namesrvAddr = "172.17.61.90:9876";
    //private DefaultMQProducer producer;

    private TransactionMQProducer producer;


    //注入订单服务
    @Autowired
    private SeckillOrderService orderService;

    //注入
    @Autowired
    private SeckillGoodsMapper seckillGoodsMapper;

    @PostConstruct
    public void initProducer() {
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();

            // 注册一个监听器
            producer.setTransactionListener(new TransactionListener() {

                /**
                 * @Description: 执行本地业务代码
                 * @Author: hubin
                 * @CreateDate: 2021/2/4 20:16
                 * @UpdateUser: hubin
                 * @UpdateDate: 2021/2/4 20:16
                 * @UpdateRemark: 修改内容
                 * @Version: 1.0
                 */
                @Override
                public LocalTransactionState executeLocalTransaction(Message message, Object o) {

                    String seckillId = null;
                    try {
                        // 接受消息
                        String msg = new String(message.getBody(),RemotingHelper.DEFAULT_CHARSET);
                        // 把消息转换成map
                        Map<String,String> maps = JSON.parseObject(msg, Map.class);
                        // 获取消息内存
                        seckillId = maps.get("seckillId");
                        String userId = maps.get("userId");

                        // 调用下单的方法
                        orderService.startKilledWithMore(Long.parseLong(seckillId),userId);

                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }catch (BaseException e){
                        // 业务中出现问题，消息事务状态应该是回滚状态
                        // 查询事务状态
                        TbSeckillGoods seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillId);
                        seckillGoods.setTransactionStatus(2);
                        seckillGoods.setStockCount(null);

                        // 更新事务状态
                        seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);

                        // 消息事务回滚
                        return LocalTransactionState.ROLLBACK_MESSAGE;


                    }


                    return null;
                }

                /**
                 * @Description: 事务状态回查方法
                 * @Author: hubin
                 * @CreateDate: 2021/2/4 20:16
                 * @UpdateUser: hubin
                 * @UpdateDate: 2021/2/4 20:16
                 * @UpdateRemark: 修改内容
                 * @Version: 1.0
                 */
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

                    try {
                        // 接受消息
                        String msg = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET);
                        // 把消息转换成map
                        Map<String,String> maps = JSON.parseObject(msg, Map.class);
                        // 获取消息内存
                        String seckillId = maps.get("seckillId");
                        String userId = maps.get("userId");

                        // 根据id查询事务状态
                        TbSeckillGoods seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillId);

                        // 获取状态值
                        Integer transactionStatus = seckillGoods.getTransactionStatus();

                        // 根据事务状态，判断是commit ,unkown ,rollback

                        if(transactionStatus == 0){
                            return LocalTransactionState.UNKNOW;
                        }
                        if(transactionStatus == 1){
                            return LocalTransactionState.COMMIT_MESSAGE;
                        }
                        if(transactionStatus == 2){
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        }


                        // 事务消息确认提交
                        return LocalTransactionState.COMMIT_MESSAGE;


                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }


                    return null;
                }
            });




            System.out.println("[Producer 已启动]");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String send(String topic, String tags, String msg) {
        SendResult result = null;
        try {
            Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            result = producer.send(message);
            System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
    }

    @PreDestroy
    public void shutDownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }

    /**
     * @Description: 发送消息，同步数据库库存
     * @Author: hubin
     * @CreateDate: 2020/10/26 21:59
     * @UpdateUser: hubin
     * @UpdateDate: 2020/10/26 21:59
     * @UpdateRemark: 修改内容
     * @Version: 1.0
     */
    public boolean asncSendMsg(Long seckillId) {
        try {
            Message message = new Message("seckill_goods_asnc_stock", "increase", (seckillId+"").getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
            //发送失败
            return false;
        }
        return true;
    }



    /**
     * @Description: 发送消息，使用事务型消息把所有的操作原子化
     * @Author: hubin
     * @CreateDate: 2020/10/26 21:59
     * @UpdateUser: hubin
     * @UpdateDate: 2020/10/26 21:59
     * @UpdateRemark: 修改内容
     * @Version: 1.0
     */
    public boolean asncSendTransactionMsg(Long seckillId,String userId) {
        try {

            Map<String,String> maps = new HashMap<>();
            maps.put("seckillId",seckillId+"");
            maps.put("userId",userId);

            //把对象转换为字符串
            String jsonStr = JSON.toJSONString(maps);

            Message message = new Message("seckill_goods_asnc_stock", "increase", jsonStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送事务消息
           producer.sendMessageInTransaction(message,null);
        } catch (Exception e) {
            e.printStackTrace();
            //发送失败
            return false;
        }
        return true;
    }
}
